In this Notebook, we show how to load the custom library generate as part of the Twitter + Watson Tone Analyzer streaming application. Code can be found here: https://github.com/ibm-cds-labs/spark.samples/tree/master/streaming-twitter. The following code is using a pre-built jar has been posted on the Github project, but you can replace with your own url if needed.
In [1]:
%AddJar https://github.com/DTAIEB/demos/raw/master/streaming-twitter-assembly-1.5.jar -f
In [2]:
val demo = com.ibm.cds.spark.samples.StreamingTwitter
demo.setConfig("twitter4j.oauth.consumerKey","XXXXX")
demo.setConfig("twitter4j.oauth.consumerSecret","XXXXX")
demo.setConfig("twitter4j.oauth.accessToken","XXXXX")
demo.setConfig("twitter4j.oauth.accessTokenSecret","XXXXX")
demo.setConfig("watson.tone.url","https://gateway.watsonplatform.net/tone-analyzer-experimental/api")
demo.setConfig("watson.tone.password","XXXXX")
demo.setConfig("watson.tone.username","XXXXX")
Start a new Twitter Stream that collects the live tweets and enrich them with Sentiment Analysis scores. The stream is run for a duration specified in the second argument of the startTwitterStreaming method. Note: if no duration is specified then the stream will run until the stopTwitterStreaming method is called.
In [3]:
import org.apache.spark.streaming._
demo.startTwitterStreaming(sc, Seconds(40))
In [4]:
val (sqlContext, df) = demo.createTwitterDataFrames(sc)
In [5]:
val fullSet = sqlContext.sql("select * from tweets") //Select all columns
fullSet.show
In [6]:
fullSet.repartition(1).saveAsParquetFile("swift://notebooks.spark/tweetsFull.parquet")
In [7]:
val angerSet = sqlContext.sql("select author, text, Anger from tweets where Anger > 70")
println(angerSet.count)
angerSet.show
In [ ]: